/*chunk table data structure */
#include "config.h"

#include <time.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <dirent.h>
#include <errno.h>
#include <limits.h>
#include <time.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "sysy_lib.h"
#include "pool_proto.h"
#include "configure.h"
#include "adt.h"
#include "bmap.h"
#include "lich_md.h"
#include "net_table.h"
#include "table_proto.h"
#include "stor_ctl.h"
#include "md_map.h"
#include "lease.h"
#include "../storage/md_parent.h"
#include "../replica/replica.h"
#include "../chunk/chunk_cleanup.h"
#include "../storage/stor_rpc.h"
#include "md_proto.h"
#include "net_global.h"
#include "pool_rename.h"
#include "pool_rmvol.h"
#include "dbg.h"
#include "../../cluster/dispatch/dispatch.h"

/*
struct of chunk table
  -------------------------------------------------------------------------------------------------------------------------------------
  | replica_srv meta (4k) | chunk info (4k) | table info (4k) | table map (16k) | table data |
  -------------------------------------------------------------------------------------------------------------------------------------
*/

typedef struct {
        uint64_t offset;
        void *ptr;
        int left;
        int fd;
} listpool_arg_t;

typedef struct {
        func2_t func2;
        chkid_t parent;
        void *arg;
} chunk_iterator_arg_t;

typedef pool_proto_entry_t entry_t;

extern worker_handler_t jobtracker;


static void __pool_proto_destroy(pool_proto_t *pool_proto);
STATIC int __pool_proto_load(pool_proto_t *pool_proto, const char *pool, const chkinfo_t *chkinfo);
STATIC int __pool_proto_load_extern(pool_proto_t *pool_proto, const chkinfo_t *__chkinfo, int new);
STATIC int __pool_proto_extend(pool_proto_t *pool_proto, const chkid_t *tableid, int op);
STATIC int __pool_proto_table(pool_proto_t *pool_proto, const chkid_t *tableid,
                             table_proto_t **_table);
STATIC int __pool_proto_chunk_check(pool_proto_t *pool_proto, const chkid_t *chkid);

static uint32_t __name_key(const void *args)
{
        return hash_str((char *)args);
}

static int __name_cmp(const void *v1, const void *v2)
{
        const entry_t *ent = (entry_t *)v1;

        DBUG("cmp %s %s\n", ent->name, (const char *)v2);

        return strcmp(ent->name, (const char *)v2);
}

static uint32_t __id_key(const void *args)
{
        const chkid_t *id = args;
        return id->id + id->idx;
}

static int __id_cmp(const void *v1, const void *v2)
{
        const entry_t *ent = (entry_t *)v1;

        DBUG("cmp "CHKID_FORMAT" : "CHKID_FORMAT"\n",
              CHKID_ARG(&ent->chkinfo->id), CHKID_ARG((const chkid_t *)v2));

        return chkid_cmp(&ent->chkinfo->id, (const chkid_t *)v2);
}

#if 0
STATIC int __pool_proto_wrlock(pool_proto_t *pool_proto)
{
        int ret;

        ret = sy_rwlock_wrlock(&pool_proto->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (pool_proto->ltime == 0) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        return 0;
err_lock:
        sy_rwlock_unlock(&pool_proto->rwlock);
err_ret:
        return ret;
}

STATIC int __pool_proto_rdlock(pool_proto_t *pool_proto)
{
        int ret;

        ret = sy_rwlock_rdlock(&pool_proto->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (pool_proto->ltime == 0) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        return 0;
err_lock:
        sy_rwlock_unlock(&pool_proto->rwlock);
err_ret:
        return ret;
}

STATIC int __pool_proto_unlock(pool_proto_t *pool_proto)
{
        return sy_rwlock_unlock(&pool_proto->rwlock);
}
#endif

STATIC int __pool_proto_alloc(entry_t **_ent, const char *name, const chkinfo_t *chkinfo, const loc_t *loc)
{
        int ret;
        entry_t *ent;

        ret = ymalloc((void **)&ent, sizeof(*ent));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&ent->name, strlen(name) + 1);
        if (unlikely(ret))
                GOTO(err_free1, ret);

        ret = ymalloc((void **)&ent->chkinfo, CHKINFO_SIZE(chkinfo->repnum));
        if (unlikely(ret))
                GOTO(err_free2, ret);

        ent->loc = *loc;
        memcpy(ent->name, name, strlen(name) + 1);
        memcpy(ent->chkinfo, chkinfo, CHKINFO_SIZE(chkinfo->repnum));

        *_ent = ent;

        return 0;
err_free2:
        yfree((void **)&ent->name);
err_free1:
        yfree((void **)&ent);
err_ret:
        return ret;
}

STATIC int __pool_proto_realloc(entry_t *ent, const char *name, const chkinfo_t *chkinfo)
{
        int ret;

        if (name && strcmp(ent->name, name)) {
                if (strlen(ent->name) != strlen(name)) {
                        ret = yrealloc((void **)&ent->name, strlen(ent->name) + 1,
                                       strlen(name) + 1);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                memcpy(ent->name, name, strlen(name) + 1);
        }

        if (chkinfo) {
                if (chkinfo->repnum != ent->chkinfo->repnum) {
                        ret = yrealloc((void **)&ent->chkinfo,
                                       CHKINFO_SIZE(ent->chkinfo->repnum),
                                       CHKINFO_SIZE(chkinfo->repnum));
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                memcpy(ent->chkinfo, chkinfo, CHKINFO_SIZE(chkinfo->repnum));
        }


        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_table_update_remote(pool_proto_t *pool_proto,
                                            const chkinfo_t *chkinfo, uint64_t info_version)
{
        int ret;
        fileid_t parent;

        ret = replica_srv_getparent(&pool_proto->chkid, &parent, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_chunk_update(pool_proto->pool, &parent, chkinfo, net_getnid(), info_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_update_dir__(pool_proto_t *pool_proto,
                                    const chkinfo_t *_chkinfo, const chkstat_t *_chkstat, int sync)
{
        int ret;
        table_proto_t *table_proto;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        table_proto = pool_proto->table_proto;
        YASSERT(table_proto);
        chkstat = table_proto->chkstat;
        chkinfo = table_proto->chkinfo;

        YASSERT(chkinfo != _chkinfo);
        YASSERT(chkstat != _chkstat);
        if (chkinfo->info_version == _chkinfo->info_version) {
                memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
                goto out;
        }

        if (sync) {
                DINFO("chunk "CHKID_FORMAT" update\n", CHKID_ARG(&chkinfo->id));
                ret = __pool_proto_table_update_remote(pool_proto, _chkinfo, chkinfo->info_version);
                if (unlikely(ret)) {
                        DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                        pool_proto->ltime = 0;
                        GOTO(err_ret, ret);
                }
        }

        chunk_cleanup_compare(pool_proto->pool, &pool_proto->chkid, chkinfo, _chkinfo);

        YASSERT(chkid_cmp(&chkinfo->id, &_chkinfo->id) == 0);
        memcpy(chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
        memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_update_extern__(pool_proto_t *pool_proto,
                                       const chkinfo_t *_chkinfo, const chkstat_t *_chkstat, int sync)
{
        int ret;
        table_proto_t *table_proto = NULL, *table_parent;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char item[DIR_PROTO_ITEM_SIZE];
        const chkid_t *chkid;

        chkid = &_chkinfo->id;
        YASSERT(chkid->idx < DIR_PROTO_EXTERN_ITEM_COUNT);

        ret = __pool_proto_table(pool_proto, chkid, &table_proto);
        YASSERT(ret == 0);

        chkstat = table_proto->chkstat;
        chkinfo = table_proto->chkinfo;

        YASSERT(chkinfo != _chkinfo);
        YASSERT(chkstat != _chkstat);
        if (chkinfo->info_version == _chkinfo->info_version) {
                memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
                goto out;
        }

        if (sync) {
                DINFO("chunk "CHKID_FORMAT" update\n", CHKID_ARG(&chkinfo->id));
                table_parent = pool_proto->table_proto;
                memcpy(item, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
                ret = table_parent->update(table_parent, chkid->idx, item, CHKINFO_SIZE(_chkinfo->repnum));
                if (unlikely(ret)) {
                        DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                        pool_proto->ltime = 0;
                        GOTO(err_ret, ret);
                }
        }

        chunk_cleanup_compare(pool_proto->pool, &pool_proto->chkid, chkinfo, _chkinfo);
        
        YASSERT(chkid_cmp(&chkinfo->id, &_chkinfo->id) == 0);
        memcpy(chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
        memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));

out:
        return 0;
err_ret:
        return ret;
}

/**
 * chkinfo发生变化时，需要更新引用该chkinfo的地方，即chunk tree里上一级chunk节点内的相关信息
 *
 * @param pool_proto
 * @param chkinfo
 * @param chkstat
 * @param sync
 * @return
 */
STATIC int __pool_proto_update_chkinfo_ref(pool_proto_t *pool_proto,
                                      const chkinfo_t *chkinfo, const chkstat_t *chkstat, int sync)
{
        int ret;
        const chkid_t *chkid = &chkinfo->id;

        if (chkid->type == __POOL_CHUNK__) {
                ret=  __pool_proto_update_dir__(pool_proto, chkinfo, chkstat, sync);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (chkid->type == __POOL_SUB_CHUNK__) {
                ret = __pool_proto_update_extern__(pool_proto, chkinfo, chkstat, sync);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                YASSERT(0);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_load_check__(pool_proto_t *pool_proto, chkinfo_t *chkinfo,
                                     chkstat_t *chkstat, int op)
{
        int ret;
        const chkid_t *chkid;
        const fileid_t *parent;

        chkid = &chkinfo->id;
        if (chkid->type == __POOL_CHUNK__) {
                parent = &pool_proto->parentid;
        } else {
                parent = &pool_proto->chkid;
        }

        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = chunk_proto_rep_check(pool_proto->pool, chkinfo, chkstat, NULL, parent, 1,
                &pool_proto->lease.token, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret=  __pool_proto_update_chkinfo_ref(pool_proto, chkinfo, chkstat, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_pre_load_check__(pool_proto_t *pool_proto, chkinfo_t *chkinfo,
                                         chkstat_t *chkstat, int op)
{
        int ret;
        uint64_t info_version;
        const chkid_t *chkid;
        const fileid_t *parent;
        char item[DIR_PROTO_ITEM_SIZE];
        table_proto_t *table_parent;

        info_version = chkinfo->info_version;
        chkid = &chkinfo->id;
        if (chkid->type == __POOL_CHUNK__) {
                parent = &pool_proto->parentid;
        } else {
                parent = &pool_proto->chkid;
        }

        chkinfo_t *old_chkinfo;
        char _chkinfo[CHKINFO_MAX];
        old_chkinfo = (void *)_chkinfo;
        CHKINFO_CP(old_chkinfo, chkinfo);

        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // TODO 如无法选出主副本，会进入deadloop
        ret = chunk_proto_rep_check(pool_proto->pool, chkinfo, chkstat, NULL, parent, 1,
                &pool_proto->lease.token, NULL, NULL);
        if (unlikely(ret)) {
                DWARN("pool "CHKID_FORMAT" reset, ret %d\n", CHKID_ARG(&pool_proto->chkid), ret);
                pool_proto->ltime = 0;
                GOTO(err_ret, ret);
        }

        if (info_version != chkinfo->info_version) {
                DINFO("chunk "CHKID_FORMAT" update\n", CHKID_ARG(&chkinfo->id));

                if (chkid->type == __POOL_CHUNK__) {
                        ret = __pool_proto_table_update_remote(pool_proto, chkinfo, info_version);
                        if (unlikely(ret)) {
                                DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                                pool_proto->ltime = 0;
                                GOTO(err_ret, ret);
                        }

                } else {
                        table_parent = pool_proto->table_proto;
                        /* need check parent first, maybe parent chkinfo is not all clean */
                        ret = __pool_proto_chunk_check(pool_proto, &table_parent->chkid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        memcpy(item, chkinfo, CHKINFO_SIZE(chkinfo->repnum));
                        ret = table_parent->update(table_parent, chkid->idx, item, CHKINFO_SIZE(chkinfo->repnum));
                        if (unlikely(ret)) {
                                DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                                pool_proto->ltime = 0;
                                GOTO(err_ret, ret);
                        }

                }

                chunk_cleanup_compare(pool_proto->pool, &pool_proto->chkid, old_chkinfo, chkinfo);
        }

        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk__(pool_proto_t *pool_proto, const chkid_t *chkid,
                               chkinfo_t **_chkinfo, chkstat_t **_chkstat)
{
        int ret;
        table_proto_t *table_proto;

        YASSERT(chkid->id == pool_proto->chkid.id);

        if (chkid->type == __POOL_SUB_CHUNK__) {
                if ((int)chkid->idx >= pool_proto->table_count) {
                        ret = ENOENT;
                        GOTO(err_ret, ret);
                }

                table_proto = pool_proto->table_array[chkid->idx];

                if (table_proto == NULL) {
                        ret = ENOENT;
                        GOTO(err_ret, ret);
                }

                *_chkinfo = table_proto->chkinfo;
                *_chkstat = table_proto->chkstat;
        } else {
                YASSERT(chkid->type == __POOL_CHUNK__);
                *_chkinfo = pool_proto->table_proto->chkinfo;
                *_chkstat = pool_proto->table_proto->chkstat;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk(pool_proto_t *pool_proto, const chkid_t *chkid,
                             chkinfo_t *_chkinfo, chkstat_t *_chkstat)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        ret = __pool_proto_chunk__(pool_proto, chkid, &chkinfo, &chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_CP(_chkinfo, chkinfo);
        if (_chkstat)
                CHKSTAT_CP(_chkstat, chkstat, chkinfo->repnum);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_needcheck__(pool_proto_t *pool_proto,
                                         const chkid_t *chkid, int *check)
{
        int ret, check1 = 0, check2 = 0;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        if (chkid->type == __POOL_SUB_CHUNK__) {
                ret = __pool_proto_chunk__(pool_proto, chkid, &chkinfo, &chkstat);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                check1 = !chunk_proto_consistent(chkinfo, chkstat, NULL);
        }

        ret = __pool_proto_chunk__(pool_proto, &pool_proto->chkid, &chkinfo, &chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        check2 = !chunk_proto_consistent(chkinfo, chkstat, NULL);

        DINFO("chunk "CHKID_FORMAT" needcheck %u, %u\n",
              CHKID_ARG(chkid), check1, check2);

        if (check1 || check2) {
                *check = 1;
        } else {
                *check = 0;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_needcheck(pool_proto_t *pool_proto,
                                       const chkid_t *chkid, int *check)
{
        int ret;

        ret = __pool_proto_chunk_needcheck__(pool_proto, chkid, check);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_check__(pool_proto_t *pool_proto, const chkid_t *chkid,
                                     int op, chkinfo_t *chkinfo, chkstat_t *chkstat)
{
        int ret;
        const chkid_t *parent;

        ret = __pool_proto_chunk(pool_proto, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (chkid->type == __POOL_CHUNK__) {
                parent = &pool_proto->parentid;
        } else {
                parent = &pool_proto->chkid;
        }

        ret = chunk_proto_rep_check(pool_proto->pool, chkinfo, chkstat, NULL, parent, 1,
                &pool_proto->lease.token, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_check2__(pool_proto_t *pool_proto, const chkid_t *chkid, int op)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char *buf;
        //char _chkinfo[CHKINFO_MAX], _chkstat[CHKSTAT_MAX];

        YASSERT(chkid->type == __POOL_SUB_CHUNK__);
        buf = mem_cache_calloc(MEM_CACHE_4K, 1);

        chkinfo = (void *)buf;
        chkstat = (void *)(buf + 4096 / 2);
        ret = __pool_proto_chunk_check__(pool_proto, chkid, op, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_update_extern__(pool_proto, chkinfo, chkstat, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __pool_proto_chunk_check3__(pool_proto_t *pool_proto, const chkid_t *chkid, int op)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char *buf;
        //char _chkinfo[CHKINFO_MAX], _chkstat[CHKSTAT_MAX];

        YASSERT(chkid->type == __POOL_CHUNK__);
        buf = mem_cache_calloc(MEM_CACHE_4K, 1);

        chkinfo = (void *)buf;
        chkstat = (void *)(buf + 4096 / 2);
        ret = __pool_proto_chunk_check__(pool_proto, chkid, op, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_update_dir__(pool_proto, chkinfo, chkstat, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __pool_proto_chunk_check(pool_proto_t *pool_proto, const chkid_t *chkid)
{
        int ret, check;

        YASSERT(chkid_cmp(chkid, &pool_proto->chkid) == 0 || chkid->type == __POOL_SUB_CHUNK__);

        ret = __pool_proto_chunk_needcheck(pool_proto, chkid, &check);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //check = 1;//remove me
        if (check) {
                if (chkid->type == __POOL_SUB_CHUNK__) {
                        ret = __pool_proto_chunk_check2__(pool_proto, chkid, __OP_WRITE);
                        if (unlikely(ret)) {
                                DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                                pool_proto->ltime = 0;
                                GOTO(err_ret, ret);
                        }

                }

                ret = __pool_proto_chunk_check3__(pool_proto, &pool_proto->chkid, __OP_WRITE);
                if (unlikely(ret)) {
                        DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                        pool_proto->ltime = 0;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

/** alloc new chunk
 *
 */
STATIC int __pool_proto_newinfo(const char *pool, const chkid_t *chkid, chkinfo_t *chkinfo, int chunk_type)
{
        int ret;
        diskid_t disks[LICH_REPLICA_MAX];
        int repnum, repmin;

        repnum = gloconf.metadata_replica;
        repmin = cluster_is_solomode()? 1 : LICH_REPLICA_MIN;

        ret = dispatch_newdisk(disks, &repnum, repmin, pool, NULL, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(chkinfo, 0x0, sizeof(*chkinfo));
        diskid2loc(chkinfo->diskid, disks, repnum);

        if (chkid) {
                chkinfo->id = *chkid;
        } else {
                ret = dispatch_newid(&chkinfo->id);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        chkinfo->repnum = repnum;

        CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

/**
 * alloc subpool chunk
 *
 */
STATIC int pool_proto_newtable(pool_proto_t *pool_proto, table_proto_t **_table_proto)
{
        int ret, idx;
        chkid_t chkid;
        table_proto_t *table_parent;
        chkinfo_t *chkinfo;
        char buf[MAX_BUF_LEN];
        char item[DIR_PROTO_ITEM_SIZE];

        if (pool_proto->table_count == DIR_PROTO_EXTERN_ITEM_COUNT) {
                ret = ENOSPC;
                GOTO(err_ret, ret);
        }

        idx = pool_proto->table_count;
        chkid = pool_proto->chkid;
        chkid.idx = idx;
        chkid.type = __POOL_SUB_CHUNK__;

        chkinfo = (void *)buf;
        ret = __pool_proto_newinfo(pool_proto->pool, &chkid, chkinfo, __POOL_CHUNK__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table_proto_create(pool_proto->pool, chkinfo, &pool_proto->chkid, net_getnid(),
                                 TABLE_PROTO_HEAD, NULL, 0, NULL, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // insert subpool into pool slot
        table_parent = pool_proto->table_proto;
        memcpy(item, chkinfo, CHKINFO_SIZE(chkinfo->repnum));
        ret = table_parent->insert(table_parent, idx, item, CHKINFO_SIZE(chkinfo->repnum));
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        ret = pool_proto_chunk_check(pool_proto, &pool_proto->chkid, __OP_WRITE);
        if (unlikely(ret)) {
                pool_proto->ltime = 0;
                DERROR("chunk "CHKID_FORMAT"\n", CHKID_ARG(_proto->chkid));
                GOTO(err_ret, ret);
        }
#endif

        ret = __pool_proto_extend(pool_proto, &chkid, __OP_WRITE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_load_extern(pool_proto, chkinfo, 1);
        if (unlikely(ret)) {
                pool_proto->ltime = 0;
                DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                GOTO(err_ret, ret);
        }

        YASSERT(idx + 1 == pool_proto->table_count);

        if (_table_proto) {
                *_table_proto = pool_proto->table_array[idx];
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_newtable(pool_proto_t *pool_proto, table_proto_t **_table_proto)
{
        int ret;

        ret = pool_proto_newtable(pool_proto, _table_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_table(pool_proto_t *pool_proto, const chkid_t *tableid,
                             table_proto_t **_table)
{
        int ret, idx;
        table_proto_t *table_proto;

        YASSERT(tableid->type == __POOL_SUB_CHUNK__);
        idx = tableid->idx;

        ret = __pool_proto_extend(pool_proto, tableid, __OP_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (pool_proto->table_array[idx] == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        table_proto = pool_proto->table_array[idx];
        *_table = table_proto;

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_get_empty(pool_proto_t *pool_proto, table_proto_t **_table_proto, loc_t *_loc)
{
        int ret, r, found = 0, i;
        table_proto_t *table_proto;
        uint32_t loc;

        if (pool_proto->table_count) {
                // TODO why?
                r = _random() % pool_proto->table_count;

                for (i = 0; i < pool_proto->table_count; i++) {
                        table_proto = pool_proto->table_array[(i + r) % pool_proto->table_count];
                        ret = table_proto->get_empty(table_proto, &loc);
                        if (unlikely(ret)) {
                                if (ret == ENOSPC) {
                                        continue;
                                } else
                                        GOTO(err_ret, ret);
                        }

                        found = 1;
                        break;
                }
        }

        if (found == 0) {
                ret = __pool_proto_newtable(pool_proto, &table_proto);
                if (unlikely(ret)) {
                        DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                        pool_proto->ltime = 0;
                        GOTO(err_ret, ret);
                }

                ret = table_proto->get_empty(table_proto, &loc);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        *_table_proto = table_proto;
        _loc->loc = loc;
        _loc->idx = table_proto->chkinfo->id.idx;

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_insert__(pool_proto_t *pool_proto, const char *key,
                                   const chkinfo_t *chkinfo, loc_t *_loc)
{
        int ret;
        char item[DIR_PROTO_EXTERN_ITEM_SIZE];
        char *value;
        table_proto_t *table_proto;
        loc_t loc;

        strcpy(item, key);
        value = (void *)&item[DIR_PROTO_EXTERN_ITEM_SIZE / 2];
        memcpy((void *)value, chkinfo, CHKINFO_SIZE(chkinfo->repnum));

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_get_empty(pool_proto, &table_proto, &loc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_chunk_check(pool_proto, &table_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table_proto->insert(table_proto, loc.loc, item,
                                  DIR_PROTO_EXTERN_ITEM_SIZE / 2 + CHKINFO_SIZE(chkinfo->repnum));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_chunk_check(pool_proto, &table_proto->chkid);
        if (unlikely(ret)) {
                DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                pool_proto->ltime = 0;
                DERROR("chunk "CHKID_FORMAT"\n", CHKID_ARG(&table_proto->chkid));
                SWARN(0, "%s, chunk "CHKID_FORMAT" check fail\n", M_DATA_CHUNK_WARN, CHKID_ARG(&table_proto->chkid));
                GOTO(err_ret, ret);
        }

        *_loc = loc;

        return 0;
err_ret:
        return ret;
}

int pool_proto_insert(pool_proto_t *pool_proto, const char *name,
                     const chkinfo_t *chkinfo)
{
        int ret;
        entry_t *ent;
        loc_t loc;

        YASSERT(strlen(name));

        if (strlen(name) + 1 > DIR_PROTO_NAME_SIZE) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ent = hash_table_find(pool_proto->name_tab, (void *)name);
        if (ent) {
                ret = EEXIST;
                GOTO(err_ret, ret);
        }

        ret =  __pool_proto_insert__(pool_proto, name, chkinfo, &loc);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DINFO("insert %s id "CHKID_FORMAT"\n", name, CHKID_ARG(&chkinfo->id));

        ret = __pool_proto_alloc(&ent, name, chkinfo, &loc);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = hash_table_insert(pool_proto->name_tab, (void *)ent, ent->name, 0);
        if (unlikely(ret)) {
                DWARN("key %s\n", ent->name);
                YASSERT(0);
        }

        ret = hash_table_insert(pool_proto->id_tab, (void *)ent, &ent->chkinfo->id, 0);
        if (unlikely(ret)) {
                DWARN("key %s\n", ent->name);
                YASSERT(0);
        }

        return 0;
err_ret:
        if (ret != EEXIST) {
                DERROR("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                pool_proto->ltime = 0;
        }
        return ret;
}

STATIC int __pool_proto_mkpool(pool_proto_t *pool_proto, const chkid_t *parent,
                const char *name, const char *site_name, const setattr_t *setattr, chkinfo_t *chkinfo)
{
        int ret;
        fileinfo_t fileinfo;
        entry_t *ent;

        if (strlen(name) + 1 > DIR_PROTO_NAME_SIZE) {
                ret = ENAMETOOLONG;
                GOTO(err_ret, ret);
        }

        if (pool_proto->name_locked) {
                pool_rename_lock_notify(NULL);
                DWARN(""CHKID_FORMAT" locked\n", CHKID_ARG(&pool_proto->chkid));
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ent = hash_table_find(pool_proto->name_tab, (void *)name);
        if (ent) {
                ret = EEXIST;
                GOTO(err_ret, ret);
        }

        ret = __pool_proto_newinfo(pool_proto->pool, NULL, chkinfo, __POOL_CHUNK__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkinfo->id.type = __POOL_CHUNK__;

        memset(&fileinfo, 0x0, sizeof(fileinfo));
        ret = md_proto_setattr(&fileinfo, setattr, &chkinfo->id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("mkpool %s @ "CHKID_FORMAT" newid "CHKID_FORMAT", replica %u/%u\n",
              name, CHKID_ARG(parent), CHKID_ARG(&chkinfo->id), fileinfo.repnum_usr, fileinfo.repnum_sys);

        ret = table_proto_create(pool_proto->pool, chkinfo, parent, net_getnid(),
                                 TABLE_PROTO_HEAD, &fileinfo, sizeof(fileinfo), NULL, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = pool_proto_insert(pool_proto, name, chkinfo);
        if (unlikely(ret)) {
                DWARN("mkpool %s @ "CHKID_FORMAT" newid "CHKID_FORMAT" fail\n",
                      name, CHKID_ARG(parent), CHKID_ARG(&chkinfo->id));

                if (ret == EEXIST) {
                        chunk_proto_rep_unlink(chkinfo, NULL);
                }

                GOTO(err_ret, ret);
        }

        memcpy(&pool_proto->fileinfo, &fileinfo, sizeof(fileinfo_t));
        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_setattr__(pool_proto_t *pool_proto, table_proto_t *table_proto,
                                 const setattr_t *setattr, int force)
{
        int ret;
        fileinfo_t fileinfo;

        memcpy(&fileinfo, &pool_proto->fileinfo, sizeof(fileinfo_t));
        ret = md_proto_setattr(&fileinfo, setattr, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("file "CHKID_FORMAT" replica %u/%u\n", CHKID_ARG(&fileinfo.id), fileinfo.repnum_usr, fileinfo.repnum_sys);

        if (force == 0) {
                ret = __pool_proto_chunk_check(pool_proto, &table_proto->chkid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = table_proto->setinfo(table_proto, &fileinfo, sizeof(fileinfo), TABLE_PROTO_INFO_ATTR);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memcpy(&pool_proto->fileinfo, &fileinfo, sizeof(fileinfo_t));

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_setattr(pool_proto_t *pool_proto, fileinfo_t *_fileinfo, const setattr_t *setattr)
{
        int ret;
        table_proto_t *table_proto;

        table_proto = pool_proto->table_proto;
        ret = __pool_proto_setattr__(pool_proto, table_proto, setattr, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (_fileinfo)
                memcpy(_fileinfo, &pool_proto->fileinfo, sizeof(pool_proto->fileinfo));

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_mkvol(pool_proto_t *pool_proto, const chkid_t *parent,
                              const char *name, const char *site_name, const setattr_t *setattr, chkinfo_t *chkinfo)
{
        int ret, total, online;
        fileinfo_t fileinfo;
        entry_t *ent;

        YASSERT(setattr->replica.set_it);
        YASSERT(setattr->replica.val <= LICH_REPLICA_MAX);

        if (!cluster_is_solomode()) {
                ret = conn_faultdomain(&total, &online);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (setattr->replica.val + 1 > total) {
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }
        }

        if (strlen(name) + 1 > DIR_PROTO_NAME_SIZE) {
                ret = ENAMETOOLONG;
                GOTO(err_ret, ret);
        }

        if (chkid_isroot(&pool_proto->parentid)) {
                DWARN("mkvol @ root is not permitted\n");
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        if (pool_proto->name_locked) {
                pool_rename_lock_notify(NULL);
                DWARN(""CHKID_FORMAT" locked\n", CHKID_ARG(&pool_proto->chkid));
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ent = hash_table_find(pool_proto->name_tab, (void *)name);
        if (ent) {
                DWARN("mkvol %s @ "CHKID_FORMAT"\n", name, CHKID_ARG(&pool_proto->chkid));
                ret = EEXIST;
                GOTO(err_ret, ret);
        }

        // 申请chunk及其副本位置
        ret = __pool_proto_newinfo(pool_proto->pool, NULL, chkinfo, __VOLUME_CHUNK__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkinfo->id.type = __VOLUME_CHUNK__;

        // 构造fileinfo
        memset(&fileinfo, 0x0, sizeof(fileinfo));
        fileinfo.snap_from = (uint64_t) -1;
        ret = md_proto_setattr(&fileinfo, setattr, &chkinfo->id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#ifdef LSV
        fileinfo.max_size = VOLUME_SIZE_MAX;
        fileinfo.volume_page_id = 1;
        fileinfo.gc_os_page_id = 2;
        fileinfo.gc_bitmap_page_id = 3;
        fileinfo.rcache_page_id = 4;
        fileinfo.wbuf_page_id = 8;

        // update in load stage
        fileinfo.bitmap_chunk_id = 0;
#endif

        DINFO("mkvol %s @ "CHKID_FORMAT" newid "CHKID_FORMAT" replica %u/%u, %u/%u\n", name,
              CHKID_ARG(&pool_proto->chkid), CHKID_ARG(&chkinfo->id),
              pool_proto->fileinfo.repnum_usr, pool_proto->fileinfo.repnum_sys,
              fileinfo.repnum_usr, fileinfo.repnum_sys);

        ret = table_proto_create(pool_proto->pool, chkinfo, parent, net_getnid(),
                                 TABLE_PROTO_HEAD, &fileinfo, sizeof(fileinfo), NULL, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // TODO 此处发生故障，会造成sqlite metadata表的垃圾数据

        ret = pool_proto_insert(pool_proto, name, chkinfo);
        if (unlikely(ret)) {
                DWARN("mkvol %s @ "CHKID_FORMAT" newid "CHKID_FORMAT" fail\n", name,
                      CHKID_ARG(&pool_proto->chkid), CHKID_ARG(&chkinfo->id));

                if (ret == EEXIST) {
                        chunk_proto_rep_unlink(chkinfo, NULL);
                }

                GOTO(err_ret, ret);
        }

#if ENABLE_BALANCE
        ret = stor_mkvol_sync2master(network_rname(&chkinfo->diskid[0].id), &chkinfo->id);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif
        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_mkvolwith(pool_proto_t *pool_proto, const chkid_t *parent,
                                  const char *name, const chkinfo_t *chkinfo)
{
        int ret;

        (void) parent;

        if (pool_proto->name_locked) {
                pool_rename_lock_notify(NULL);
                DWARN(""CHKID_FORMAT" locked\n", CHKID_ARG(&pool_proto->chkid));
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        DINFO("mkvol %s @ "CHKID_FORMAT" newid "CHKID_FORMAT" replica %u/%u\n", name,
              CHKID_ARG(&pool_proto->chkid), CHKID_ARG(&chkinfo->id),
              pool_proto->fileinfo.repnum_usr, pool_proto->fileinfo.repnum_sys);

        ret = pool_proto_insert(pool_proto, name, chkinfo);
        if (unlikely(ret)) {
                DWARN("mkvol %s @ "CHKID_FORMAT" newid "CHKID_FORMAT" fail\n", name,
                      CHKID_ARG(&pool_proto->chkid), CHKID_ARG(&chkinfo->id));
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_lookup(pool_proto_t *pool_proto,
                               const char *name, chkinfo_t *chkinfo)
{
        int ret;
        entry_t *ent;

        ent = hash_table_find(pool_proto->name_tab, (void *)name);
        if (ent == NULL) {
                ret = ENOENT;
                DBUG("find %s @ "CHKID_FORMAT"\n", name,
                      CHKID_ARG(&pool_proto->table_proto->chkinfo->id));
                GOTO(err_ret, ret);
        }

        memcpy(chkinfo, ent->chkinfo, CHKINFO_SIZE(ent->chkinfo->repnum));
        YASSERT(chkinfo->id.type == __POOL_CHUNK__ || chkinfo->id.type == __VOLUME_CHUNK__);

        return 0;
err_ret:
        return ret;
}

int pool_proto_del(pool_proto_t *pool_proto, const char *key)
{
        int ret;
        entry_t *ent, *ent1, *ent2;
        table_proto_t *table_proto;

        DINFO("pool %s name %s\n", pool_proto->pool, key);

        ent = hash_table_find(pool_proto->name_tab, (void *)key);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        table_proto = pool_proto->table_array[ent->loc.idx];

        ret = __pool_proto_chunk_check(pool_proto, &table_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table_proto->del(table_proto, ent->loc.loc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_chunk_check(pool_proto, &table_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = hash_table_remove(pool_proto->name_tab, key, (void **)&ent1);
        YASSERT(ret == 0);
        ret = hash_table_remove(pool_proto->id_tab, &ent->chkinfo->id, (void **)&ent2);
        YASSERT(ret == 0);

        if (ent->chkinfo->id.type != __VOLUME_CHUNK__) {
                chunk_proto_rep_unlink(ent->chkinfo, NULL);
        }

        YASSERT(ent = ent1);
        YASSERT(ent = ent2);

        yfree((void **)&ent->name);
        yfree((void **)&ent->chkinfo);
        yfree((void **)&ent);

        return 0;
err_ret:
        if (ret != ENOENT) {
                DERROR("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                pool_proto->ltime = 0;
        }
        return ret;
}

STATIC int __pool_proto_unlink(pool_proto_t *pool_proto, const char *name, int force)
{
        return pool_rmvol(pool_proto, name, force);
}

STATIC int __pool_proto_cleanup(pool_proto_t *pool_proto, const char *name)
{
        int ret;
        entry_t *ent;

        ent = hash_table_find(pool_proto->name_tab, (void *)name);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        if (ent->chkinfo->id.type != __VOLUME_CHUNK__) {
                ret = EISDIR;
                GOTO(err_ret, ret);
        }

        ret = pool_proto_del(pool_proto, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_rmpool(pool_proto_t *pool_proto, const char *name)
{
        int ret;
        entry_t *ent;

        ent = hash_table_find(pool_proto->name_tab, (void *)name);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        if (ent->chkinfo->id.type != __POOL_CHUNK__) {
                ret = ENOTDIR;
                GOTO(err_ret, ret);
        }

        ret = pool_proto_del(pool_proto, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int pool_proto_update(pool_proto_t *pool_proto,
                           const char *key, const chkinfo_t *chkinfo, const loc_t *loc)
{
        int ret;
        char *item;//[DIR_PROTO_EXTERN_ITEM_SIZE];
        char *value;
        table_proto_t *table_proto;

        item = mem_cache_calloc1(MEM_CACHE_4K, DIR_PROTO_EXTERN_ITEM_SIZE);

        strcpy(item, key);
        value = (void *)&item[DIR_PROTO_EXTERN_ITEM_SIZE / 2];
        memcpy((void *)value, chkinfo, CHKINFO_SIZE(chkinfo->repnum));

        table_proto = pool_proto->table_array[loc->idx];
        ret = __pool_proto_chunk_check(pool_proto, &table_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table_proto->update(table_proto, loc->loc, item,
                                  DIR_PROTO_EXTERN_ITEM_SIZE / 2 + CHKINFO_SIZE(chkinfo->repnum));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_chunk_check(pool_proto, &table_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, item);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, item);
        DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
        pool_proto->ltime = 0;
        return ret;
}

STATIC int __pool_proto_update(pool_proto_t *pool_proto, entry_t *ent,
                               const char *_key, const chkinfo_t *_chkinfo)
{
        int ret;
        const char *key;
        const chkinfo_t *chkinfo;

        key = _key ? _key : ent->name;
        chkinfo = _chkinfo ? _chkinfo : ent->chkinfo;
        ret = pool_proto_update(pool_proto, key, chkinfo, &ent->loc);
        if (unlikely(ret)) {
                DWARN("pool "CHKID_FORMAT" reset\n", CHKID_ARG(&pool_proto->chkid));
                pool_proto->ltime = 0;
                GOTO(err_ret, ret);
        }

        ret = __pool_proto_realloc(ent, key, chkinfo);
        if (unlikely(ret))
                YASSERT(0);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_getinfo(pool_proto_t *pool_proto, const chkid_t *chkid,
                                entry_t **_ent, chkinfo_t *chkinfo)
{
        int ret;
        entry_t *ent = NULL;

        if (chkid->type == __POOL_SUB_CHUNK__) {
                YASSERT(_ent == NULL);

                ret = __pool_proto_chunk(pool_proto, chkid, chkinfo, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ent = hash_table_find(pool_proto->id_tab, (void *)chkid);
                if (ent == NULL) {
                        ret = ENOENT;
                        DWARN("get chunk "CHKID_FORMAT" @ "CHKID_FORMAT"\n",
                              CHKID_ARG(chkid), CHKID_ARG(&pool_proto->chkid));
                        GOTO(err_ret, ret);
                }

                memcpy(chkinfo, ent->chkinfo, CHKINFO_SIZE(ent->chkinfo->repnum));
        }

        if (_ent)
                *_ent = ent;

        return 0;
err_ret:
        return ret;

}

STATIC int __pool_proto_listpool_to_file(void *_arg, void *_ent)
{
        int ret;
        entry_t *ent = _ent;
        listpool_arg_t *arg = _arg;
        struct dirent de;

        memset(&de, 0x0, sizeof(de));
        de.d_reclen = sizeof(de) + MAX_NAME_LEN;
        de.d_off = arg->offset;

        strcpy(de.d_name, ent->name);

        ret = _pwrite(arg->fd, &de, de.d_reclen, arg->offset);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        arg->offset += de.d_reclen;

        return 0;
err_ret:
        return ret;
}

static int __poollist_write_end_flag(int fd, uint64_t offset)
{
        struct dirent de;
        int ret;

        memset(&de, 0x0, sizeof(de));
        de.d_name[0] = '\0';
        de.d_reclen = (sizeof(de) + MAX_NAME_LEN);
        de.d_off = offset;

        ret = _pwrite(fd, &de, de.d_reclen, offset);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_listpool_open(pool_proto_t *pool_proto, int fd)
{
        int ret;
        listpool_arg_t arg;

        arg.fd = fd;
        arg.offset = 0;

        ret = hash_iterate_table_entries(pool_proto->name_tab, __pool_proto_listpool_to_file, &arg);
        if (unlikely(ret)) {
                if (ret == ENOMEM) {
                        DERROR("out of mem\n");
                        SERROR(0, "%s, table iterate ret: %d\n", M_DATA_TABLE_ERROR, ret);
                } else
                        GOTO(err_ret, ret);
        }

        ret = __poollist_write_end_flag(fd, arg.offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_getinfo(pool_proto_t *pool_proto, const chkid_t *chkid,
                                      chkinfo_t *chkinfo)
{
        int ret;

        YASSERT(chkid_cmp(chkid, &pool_proto->chkid));

        ret = __pool_proto_getinfo(pool_proto, chkid, NULL, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

   //     CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_getattr(pool_proto_t *pool_proto, fileinfo_t *fileinfo)
{

        *fileinfo = pool_proto->fileinfo;
        if (fileinfo->size == 0)
                fileinfo->size = FAKE_BLOCK;

        return 0;
}

STATIC int __pool_proto_chunk_update(pool_proto_t *pool_proto, const chkinfo_t *chkinfo,
                                     const nid_t *owner, uint64_t info_version)
{
        int ret;
        entry_t *ent;
        const chkid_t *chkid;
        nid_t nid;

        chkid = &chkinfo->id;
        YASSERT(chkid->type == __VOLUME_CHUNK__ || chkid->type == __POOL_CHUNK__);
        YASSERT(chkid_cmp(chkid, &pool_proto->chkid));

        ent = hash_table_find(pool_proto->id_tab, (void *)&chkinfo->id);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        if (nid_cmp(owner, &ent->chkinfo->diskid[0].id)) {
                ret = lease_get(&chkinfo->id, &nid, NULL);
                if (ret) {
                        DWARN("chunk "CHKID_FORMAT" info_version %llu : %llu\n",
                              CHKID_ARG(&chkinfo->id), (LLU)ent->chkinfo->info_version, (LLU)info_version);
                        ret = EPERM;
                        GOTO(err_ret, ret);
                } else {
                        if (nid_cmp(owner, &nid) == 0) {
                                DINFO("chunk "CHKID_FORMAT" info_version %llu : %llu\n",
                                      CHKID_ARG(&chkinfo->id), (LLU)ent->chkinfo->info_version, (LLU)info_version);
                        } else {
                                DWARN("chunk "CHKID_FORMAT" info_version %llu : %llu\n",
                                      CHKID_ARG(&chkinfo->id), (LLU)ent->chkinfo->info_version, (LLU)info_version);
                                ret = EPERM;
                                GOTO(err_ret, ret);
                        }
                } 
        }

        if (ent->chkinfo->info_version != info_version) {
                DWARN("chunk "CHKID_FORMAT" info_version %llu : %llu\n",
                      CHKID_ARG(&chkinfo->id), (LLU)ent->chkinfo->info_version, (LLU)info_version);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = __pool_proto_update(pool_proto, ent, NULL, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_set(pool_proto_t *pool_proto, const chkid_t *chkid,
                                   const nid_t *nid, int status)
{
        int ret, seted;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];

        if (chkid->id != pool_proto->chkid.id) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __pool_proto_chunk(pool_proto, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = chunk_proto_set(chkinfo, chkstat, nid, status, &seted);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (seted) {
                int sync = (status == __S_CHECK) ? 0 : 1;

                ret = __pool_proto_update_chkinfo_ref(pool_proto, chkinfo, chkstat, sync);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_needreload(pool_proto_t *pool_proto)
{
        int ret;
        time_t ltime;

        if (pool_proto->ltime == 0) {
                DINFO("table "CHKID_FORMAT" need check\n", CHKID_ARG(&pool_proto->chkid));
                return 1;
        }

        ret = network_connect(&pool_proto->parentnid, &ltime, 1, 0);
        if (ret || (ltime != pool_proto->ltime)) {
                DINFO("table "CHKID_FORMAT" need check, ret %u ltime %u %u\n",
                      CHKID_ARG(&pool_proto->chkid), ret, (int)ltime, (int)pool_proto->ltime);
                return 1;
        }

        return 0;
}

STATIC int __pool_proto_connect(pool_proto_t *pool_proto, const fileid_t *parent, const chkid_t *chkid)
{
        int ret;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        if (chkid_isroot(parent)) {
                pool_proto->parentnid = *net_getadmin();
        } else {
                ret = md_map_getsrv(parent, &pool_proto->parentnid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = network_connect(&pool_proto->parentnid, &pool_proto->ltime, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(pool_proto->pool, parent, chkid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!chkid_isroot(parent)) {
                YASSERT(parent->type == __POOL_CHUNK__);
        }
        pool_proto->parentid = *parent;

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_reject(pool_proto_t *pool_proto, const chkid_t *chkid,
                              const nid_t *bad, chkinfo_t *chkinfo)
{
        int ret;
        entry_t *ent;

        YASSERT(chkid->type == __VOLUME_CHUNK__ || chkid->type == __POOL_CHUNK__);
        YASSERT(chkid_cmp(chkid, &pool_proto->chkid));

        ret = __pool_proto_getinfo(pool_proto, chkid, &ent, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_proto_reject(chkinfo, bad);
        if (unlikely(ret)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = __pool_proto_update(pool_proto, ent, NULL, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_xattr_set(pool_proto_t *pool_proto, const char *key, const char *value,
                                uint32_t valuelen, int flag)
{
        int ret;
        table_proto_t *table_proto;

        (void) valuelen;

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //ret = chunk_proto_xattr_set(pool_proto->table_proto->chkinfo, 0, key, value, valuelen, flag);
        table_proto = pool_proto->table_proto;
        ret = table_proto->xattr->set( table_proto, key, value, flag);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_xattr_get(pool_proto_t *pool_proto, const char *key, char *value,
                                int *valuelen)
{
        int ret;
        table_proto_t *table_proto;

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //ret = chunk_proto_xattr_get(pool_proto->table_proto->chkinfo, 0, key, value, valuelen);
        table_proto = pool_proto->table_proto;
        ret = table_proto->xattr->get( table_proto, key, value);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *valuelen = strlen(value) + 1;

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_xattr_list(pool_proto_t *pool_proto, char *buf, int *buflen)
{
        int ret;
        table_proto_t *table_proto;

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table_proto = pool_proto->table_proto;
        ret = table_proto->xattr->list(table_proto, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *buflen = strlen(buf) + 1;

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_xattr_remove(pool_proto_t *pool_proto, const char *key)
{
        int ret;
        table_proto_t *table_proto;

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table_proto = pool_proto->table_proto;
        ret = table_proto->xattr->remove( table_proto, key);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_chunk_check(pool_proto, &pool_proto->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_move(pool_proto_t *pool_proto, const chkid_t *chkid,
                                   const nid_t *dist, int dist_count)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];
        const chkid_t *parent;

        /*DINFO("mv chkid "CHKID_FORMAT" --> "CHKID_FORMAT"\n",*/
              /*CHKID_ARG(chkid), CHKID_ARG(&pool_proto->chkid));*/

        YASSERT(chkid_cmp(chkid, &pool_proto->chkid) == 0 || chkid->type == __POOL_SUB_CHUNK__);

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __pool_proto_chunk(pool_proto, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_DUMP(chkinfo, D_INFO);

        if (chkid->type == __POOL_CHUNK__) {
                parent = &pool_proto->parentid;
        } else {
                parent = &pool_proto->chkid;
        }

        ret = chunk_proto_rep_check(pool_proto->pool, chkinfo, chkstat, NULL, parent, 1,
                &pool_proto->lease.token, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_update_chkinfo_ref(pool_proto, chkinfo, chkstat, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = chunk_proto_rep_move(pool_proto->pool, chkinfo, chkstat, NULL, parent, dist, dist_count,
                &pool_proto->lease.token, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_update_chkinfo_ref(pool_proto, chkinfo, chkstat, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_sync(pool_proto_t *pool_proto, const chkid_t *chkid)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];
        const chkid_t *parent;

        DINFO("chkid "CHKID_FORMAT" --> "CHKID_FORMAT"\n",
              CHKID_ARG(chkid), CHKID_ARG(&pool_proto->chkid));

        //YASSERT(chkid_cmp(chkid, &pool_proto->chkid) == 0 || chkid->type == __POOL_SUB_CHUNK__);
        if (chkid_cmp(chkid, &pool_proto->chkid) != 0 && chkid->type != __POOL_SUB_CHUNK__) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __pool_proto_chunk(pool_proto, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_DUMP(chkinfo, D_INFO);

        if (chkid->type == __POOL_CHUNK__) {
                parent = &pool_proto->parentid;
        } else {
                parent = &pool_proto->chkid;
        }

        ret = chunk_proto_rep_sync(pool_proto->pool, chkinfo, chkstat, NULL, parent, 1,
                &pool_proto->lease.token, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_update_chkinfo_ref(pool_proto, chkinfo, chkstat, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_chunk_iterator__(void *_context, void *_ent)
{
        entry_t *ent = _ent;
        chunk_iterator_arg_t *arg = _context;

        arg->func2(arg->arg, &arg->parent, ent->chkinfo);

        return 0;
}

STATIC int __pool_proto_chunk_iterator(pool_proto_t *pool_proto, func2_t func2, void *_arg)
{
        // int ret;
        table_proto_t *table_proto;

        // TODO isroot
#if 0
        if (chkid_isroot(&pool_proto->parentid)) {
                DINFO("check root chunk\n");

                table_proto = pool_proto->table_proto;
                func2(_arg, &pool_proto->parentid, table_proto->chkinfo);
        }
#endif

        // subpool
        for (int i = 0; i < pool_proto->table_count; i++) {
                table_proto = pool_proto->table_array[i];
                YASSERT(table_proto);
                func2(_arg, &pool_proto->chkid, table_proto->chkinfo);
        }

        // pool chunk
        func2(_arg, &pool_proto->chkid, pool_proto->table_proto->chkinfo);

#if 0
        chunk_iterator_arg_t arg;
        arg.func2 = func2;
        arg.arg = _arg;
        arg.parent = pool_proto->chkid;

        // children
        ret = hash_iterate_table_entries(pool_proto->id_tab, __pool_proto_chunk_iterator__, &arg);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
#endif

        return 0;
}

STATIC int __pool_proto_chunk_cleanup(pool_proto_t *pool_proto, const chkid_t *chkid,
                                     const nid_t *nid, uint64_t meta_version)
{
        int ret;
        entry_t *ent;
        table_proto_t *table_proto;
        const chkinfo_t *chkinfo;

        DINFO("cleanup "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        
        YASSERT(chkid->type == __VOLUME_CHUNK__
                || chkid->type == __POOL_CHUNK__
                || chkid->type == __POOL_SUB_CHUNK__);

        if (chkid_cmp(chkid, &pool_proto->chkid) == 0) {
                table_proto = pool_proto->table_proto;
                chkinfo = table_proto->chkinfo;
        } else if (chkid->type == __POOL_SUB_CHUNK__) {
                if ((int)chkid->idx >= pool_proto->table_count) {
                        ret = ENOENT;
                        GOTO(err_ret, ret);
                }

                table_proto = pool_proto->table_array[chkid->idx];

                if (table_proto == NULL) {
                        ret = ENOENT;
                        DERROR("cleanup "CHKID_FORMAT" @ "CHKID_FORMAT" fail\n",
                               CHKID_ARG(chkid), CHKID_ARG(&pool_proto->chkid));
                        GOTO(err_ret, ret);
                }

                chkinfo = table_proto->chkinfo;
        } else {
                ent = hash_table_find(pool_proto->id_tab, (void *)chkid);
                if (ent == NULL) {
                        ret = ENOENT;
                        DERROR("cleanup "CHKID_FORMAT" @ "CHKID_FORMAT" fail\n",
                               CHKID_ARG(chkid), CHKID_ARG(&pool_proto->chkid));
                        GOTO(err_ret, ret);
                }

                chkinfo = ent->chkinfo;
        }

        ret = md_proto_chunk_cleanup(chkinfo, nid, meta_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_new(pool_proto_t **_pool_proto, const chkid_t *chkid, const char *pool)
{
        int ret;
        pool_proto_t *pool_proto;
        char name[MAX_NAME_LEN];

        ret = ymalloc((void **)&pool_proto, sizeof(*pool_proto));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(pool_proto, 0x0, sizeof(*pool_proto));
        strcpy(pool_proto->pool, pool);

        snprintf(name, sizeof(name), "dir_name:"CHKID_FORMAT"", CHKID_ARG(chkid));
        pool_proto->name_tab = hash_create_table(__name_cmp, __name_key, name);
        if (pool_proto->name_tab == NULL) {
                ret = ENOMEM;
                GOTO(err_free, ret);
        }

        snprintf(name, sizeof(name), "dir_id:"CHKID_FORMAT"", CHKID_ARG(chkid));
        pool_proto->id_tab = hash_create_table(__id_cmp, __id_key, name);
        if (pool_proto->id_tab == NULL) {
                ret = ENOMEM;
                GOTO(err_free, ret);
        }

        pool_proto->needreload = __pool_proto_needreload;

        pool_proto->setattr = __pool_proto_setattr;
        pool_proto->getattr = __pool_proto_getattr;

        pool_proto->listpool_open = __pool_proto_listpool_open;
        pool_proto->lookup = __pool_proto_lookup;

        pool_proto->mkpool = __pool_proto_mkpool;
        pool_proto->mkvol = __pool_proto_mkvol;
        pool_proto->mkvolwith = __pool_proto_mkvolwith;
        pool_proto->rmpool = __pool_proto_rmpool;
        pool_proto->unlink = __pool_proto_unlink;
        pool_proto->cleanup = __pool_proto_cleanup;

        pool_proto->newtab = __pool_proto_newtable;

        pool_proto->rename = pool_rename1;
        pool_proto->rename_lock = pool_rename_lock;
        pool_proto->rename_unlock = pool_rename_unlock;

        pool_proto->xattr_set = __pool_proto_xattr_set;
        pool_proto->xattr_get = __pool_proto_xattr_get;
        pool_proto->xattr_list = __pool_proto_xattr_list;
        pool_proto->xattr_remove = __pool_proto_xattr_remove;

        pool_proto->chunk_reject = __pool_proto_chunk_reject;
        pool_proto->chunk_set = __pool_proto_chunk_set;
        pool_proto->chunk_check = __pool_proto_chunk_check;
        pool_proto->chunk_sync = __pool_proto_chunk_sync;
        pool_proto->chunk_move = __pool_proto_chunk_move;
        pool_proto->chunk_update = __pool_proto_chunk_update;
        pool_proto->chunk_cleanup = __pool_proto_chunk_cleanup;
        pool_proto->chunk_getinfo = __pool_proto_chunk_getinfo;
        pool_proto->chunk_iterator = __pool_proto_chunk_iterator;

        pool_proto->chkid = *chkid;
        pool_proto->ltime = 0;
        pool_proto->name_locked = 0;

        ret = lease_create(&pool_proto->lease, chkid);
        if (unlikely(ret))
                GOTO(err_free, ret);

        *_pool_proto = pool_proto;

        return 0;
err_free:
        yfree((void **)&pool_proto);
err_ret:
        return ret;
}

STATIC int __pool_proto_load_extern_callback(const void *value, int _loc, int _idx, void *ctx)
{
        int ret;
        pool_proto_t *pool_proto;
        const chkinfo_t *chkinfo;
        const char *name;
        entry_t *ent;
        loc_t loc;

        pool_proto = ctx;
        name = value;
        chkinfo = value + DIR_PROTO_EXTERN_ITEM_SIZE / 2;

        DINFO("insert %s id "CHKID_FORMAT" into "CHKID_FORMAT"\n",
              name, CHKID_ARG(&chkinfo->id), CHKID_ARG(&pool_proto->chkid));

        loc.loc = _loc;
        //loc.idx = chkinfo->id.idx;
        loc.idx = _idx;
        ret = __pool_proto_alloc(&ent, name, chkinfo, &loc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(chkinfo->id.type == __POOL_CHUNK__ || chkinfo->id.type == __VOLUME_CHUNK__);

        ret = hash_table_insert(pool_proto->name_tab, (void *)ent, ent->name, 0);
        if (unlikely(ret)) {
                DERROR("key %s\n", ent->name);
                SERROR(0, "%s, table insert key %s, ret: %d\n", M_DATA_TABLE_ERROR,  ent->name, ret);
                goto out;
                //YASSERT(0);
        }

        ret = hash_table_insert(pool_proto->id_tab, (void *)ent, &ent->chkinfo->id, 0);
        if (unlikely(ret)) {
                DERROR("key %s\n", ent->name);
                SERROR(1, "%s, table insert key %s, ret: %d\n", M_DATA_TABLE_ERROR,  ent->name, ret);
                goto out;
                //YASSERT(0);
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_extend(pool_proto_t *pool_proto, const chkid_t *tableid, int op)
{
        int ret, idx;

        YASSERT(tableid->type == __POOL_SUB_CHUNK__);
        idx = tableid->idx;

        if (idx >= pool_proto->table_count) {
                YASSERT(idx == pool_proto->table_count);

                if (op == __OP_WRITE) {
                        ret = yrealloc((void **)&pool_proto->table_array,
                                       sizeof(table_proto_t *) * pool_proto->table_count,
                                       sizeof(table_proto_t *) * (idx + 1));
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        pool_proto->table_count = idx + 1;
                } else {
                        ret = ENOENT;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_load_extern(pool_proto_t *pool_proto, const chkinfo_t *__chkinfo, int new)
{
        int ret;
        const chkid_t *chkid;
        table_proto_t *table_proto;
        chkstat_t *chkstat;
        chkinfo_t *chkinfo;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];

        chkid = &__chkinfo->id;
        YASSERT((uint64_t)pool_proto->table_count > chkid->idx);

        chkstat = (void *)_chkstat;
        chkinfo = (void *)_chkinfo;
        memset(chkstat, 0x0, CHKSTAT_SIZE(__chkinfo->repnum));
        CHKINFO_CP(chkinfo, __chkinfo);

        ret = __pool_proto_pre_load_check__(pool_proto, chkinfo, chkstat, __OP_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table_proto_load(&table_proto, chkinfo, chkstat,
                               DIR_PROTO_EXTERN_ITEM_SIZE,
                               DIR_PROTO_EXTERN_ITEM_COUNT, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!new) {
                ret = table_proto->iterator(table_proto, __pool_proto_load_extern_callback,
                                            chkid->idx, pool_proto);
                if (unlikely(ret))
                        GOTO(err_table, ret);
        }

        pool_proto->table_array[chkid->idx] = table_proto;

        CHKINFO_CP(chkinfo, table_proto->chkinfo);
        CHKSTAT_CP(chkstat, table_proto->chkstat, chkinfo->repnum);
        ret = __pool_proto_load_check__(pool_proto, chkinfo,
                                        chkstat, __OP_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(table_proto->chkid.type == __POOL_SUB_CHUNK__);

        return 0;
err_table:
        table_proto_destroy(table_proto);
err_ret:
        return ret;
}

STATIC int __pool_proto_load_callback(const void *value, int loc, int idx, void *ctx)
{
        int ret;
        pool_proto_t *pool_proto;
        const chkinfo_t *chkinfo;

        (void) loc;
        (void) idx;

        pool_proto = ctx;
        chkinfo = value;

        ret = __pool_proto_extend(pool_proto, &chkinfo->id, __OP_WRITE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __pool_proto_load_extern(pool_proto, chkinfo, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_attr_force(pool_proto_t *pool_proto, table_proto_t *table_proto)
{
        int ret;
        const nid_t *nid;
        fileinfo_t fileinfo;
        setattr_t setattr;

        if (chkid_isroot(&pool_proto->parentid)) {
                md_initattr(&setattr, __S_IFDIR, gloconf.chunk_rep);
        } else {
                nid = &pool_proto->parentnid;
                if (net_islocal(nid)) {
                        DBUG("chunk %s %p\n", id2str(&pool_proto->parentid), &pool_proto->parentid);
                        ret = stor_ctl_getattr(&pool_proto->parentid, &fileinfo);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        ret = stor_rpc_getattr(nid, &pool_proto->parentid, &fileinfo);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                YASSERT(fileinfo.repnum_usr);
                md_initattr(&setattr, __S_IFDIR, fileinfo.repnum_usr);
        }

        ret = __pool_proto_setattr__(pool_proto, table_proto, &setattr, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_loadattr(pool_proto_t *pool_proto, table_proto_t *table_proto)
{
        int ret, count;
        char buf[LICH_BLOCK_SIZE];

        //count = sizeof(pool_proto->fileinfo);
        count = LICH_BLOCK_SIZE;
        ret = table_proto->getinfo(table_proto, buf, &count, TABLE_PROTO_INFO_ATTR);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        memcpy(&pool_proto->fileinfo, buf, sizeof(pool_proto->fileinfo));

        if (count == 0) {
                DWARN("load empty info "CHKID_FORMAT"\n", CHKID_ARG(&pool_proto->chkid));
                ret = __pool_proto_attr_force(pool_proto, table_proto);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __pool_proto_load(pool_proto_t *pool_proto, const char *pool, const chkinfo_t *__chkinfo)
{
        int ret;
        table_proto_t *table_proto;
        chkstat_t *chkstat;
        chkinfo_t *chkinfo;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];

        ANALYSIS_BEGIN(0);

        chkstat = (void *)_chkstat;
        chkinfo = (void *)_chkinfo;
        memset(chkstat, 0x0, CHKSTAT_SIZE(__chkinfo->repnum));
        CHKINFO_CP(chkinfo, __chkinfo);

        ret = __pool_proto_pre_load_check__(pool_proto, chkinfo, chkstat, __OP_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        /**
         * Bug #11186 maybe the master replica already recovery to another node by __pool_proto_pre_load_check__.
         * Are we allow this happen? like disk lost.
         */
        //YASSERT(net_islocal(&chkinfo->diskid[0].id));
        if (!net_islocal(&chkinfo->diskid[0].id)) {
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        ret = table_proto_load(&table_proto, chkinfo, chkstat,
                               DIR_PROTO_ITEM_SIZE, DIR_PROTO_ITEM_COUNT, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret))
                GOTO(err_free, ret);
        
        ret = __pool_proto_loadattr(pool_proto, table_proto);
        if (unlikely(ret))
                GOTO(err_free, ret);

        pool_proto->table_proto = table_proto;//ugly

        ret = table_proto->iterator(table_proto, __pool_proto_load_callback, -1, pool_proto);
        if (unlikely(ret))
                GOTO(err_free, ret);

        DINFO("pool "CHKID_FORMAT" table count %u\n", CHKID_ARG(&pool_proto->chkid),
              pool_proto->table_count);

        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret))
                GOTO(err_free, ret);
        
        ret = pool_rmvol_load(pool_proto);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = pool_rename_load(pool_proto);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = pool_rename_load_lock(pool_proto);
        if (unlikely(ret))
                GOTO(err_free, ret);

        CHKINFO_CP(chkinfo, table_proto->chkinfo);
        CHKSTAT_CP(chkstat, table_proto->chkstat, chkinfo->repnum);
        uint64_t info_version = chkinfo->info_version;
        ret = __pool_proto_load_check__(pool_proto, chkinfo,
                                        chkstat, __OP_READ);
        if (unlikely(ret))
                GOTO(err_free, ret);

        if (info_version != chkinfo->info_version) {
                ret = EAGAIN;
                DWARN("pool "CHKID_FORMAT" need reload\n", CHKID_ARG(&pool_proto->chkid));
                GOTO(err_free, ret);
        }
        
        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret))
                GOTO(err_free, ret);
        
        CHKINFO_DUMP(table_proto->chkinfo, D_INFO);

        ret = md_chunk_getinfo1(pool, &pool_proto->parentid, &pool_proto->chkid, chkinfo, NULL);
        if (ret)
                GOTO(err_free, ret);

        YASSERT(memcmp(table_proto->chkinfo, chkinfo, sizeof(*chkinfo)) == 0);
        ANALYSIS_END(0, IO_WARN, id2str(&chkinfo->id));

        return 0;
err_free:
        __pool_proto_destroy(pool_proto);
err_ret:
        //pool_proto->ltime = 0;
        return ret;
}

static void  __pool_proto_nonfree(void *args)
{
        (void) args;
}

static void  __pool_proto_free(void *args)
{
        entry_t *ent = args;

        yfree((void **)&ent->name);
        yfree((void **)&ent->chkinfo);
        yfree((void **)&ent);
}

static void __pool_proto_destroy(pool_proto_t *pool_proto)
{
        int i;
        table_proto_t *table_proto;

        if (pool_proto->name_tab)
                hash_destroy_table(pool_proto->name_tab, __pool_proto_nonfree);
        if (pool_proto->id_tab)
                hash_destroy_table(pool_proto->id_tab, __pool_proto_free);
        pool_proto->name_tab = NULL;
        pool_proto->id_tab = NULL;
        pool_proto->ltime = 0;

        if (pool_proto->table_proto) {
                table_proto_destroy(pool_proto->table_proto);
                pool_proto->table_proto = NULL;
        }

        if (pool_proto->table_array) {
                for (i = 0; i < pool_proto->table_count; i++) {
                        table_proto = pool_proto->table_array[i];
                        if (table_proto) {
                                table_proto_destroy(table_proto);
                        }
                }

                yfree((void **)&pool_proto->table_array);
                pool_proto->table_array = NULL;
                pool_proto->table_count = 0;
        }
}

void pool_proto_destroy(pool_proto_t *pool_proto)
{
        __pool_proto_destroy(pool_proto);

        yfree((void **)&pool_proto);
}

int pool_proto_load(pool_proto_t **_pool_proto, const char *pool,
                    const chkid_t *parent, const chkinfo_t *chkinfo)
{
        int ret;
        pool_proto_t *pool_proto;
        const chkid_t *chkid;

        if (!net_islocal(&chkinfo->diskid[0].id)) {
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }
        
        chkid = &chkinfo->id;

        ANALYSIS_BEGIN(0);

        DBUG("load "CHKID_FORMAT" begin\n", CHKID_ARG(&chkinfo->id));

        YASSERT(chkid->type == __POOL_CHUNK__);

        ret = __pool_proto_new(&pool_proto, chkid, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret)) {
#if 0
                /* the last lease not timeout,just try again, do not need md_map_drop & locator_rpc_lookup */
                if (ret == EREMCHG)
                        ret = EAGAIN;
#endif
                GOTO(err_free, ret);
        }
        
        ret = __pool_proto_connect(pool_proto, parent, chkid);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = __pool_proto_load(pool_proto, pool, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_conn, ret);
        }

        ret = __pool_proto_connect(pool_proto, parent, chkid);//connect twice
        if (unlikely(ret))
                GOTO(err_lease, ret);

        pool_proto->uptime = gettime();
        *_pool_proto = pool_proto;

        ANALYSIS_END(0, 1000 * 100, NULL);

        ret = lease_set(&pool_proto->lease);
        if (unlikely(ret))
                GOTO(err_lease, ret);

        DINFO("load "CHKID_FORMAT" success\n", CHKID_ARG(&chkinfo->id));
        
        return 0;
err_lease:
        lease_free(&pool_proto->lease);
err_conn:
        pool_proto->ltime = 0;
err_free:
        pool_proto_destroy(pool_proto);
err_ret:
        DBUG("load "CHKID_FORMAT" fail, ret:%d\n",
             CHKID_ARG(&chkinfo->id), ret);
        SWARN(0, "%s load "CHKID_FORMAT" fail, ret:%d\n", M_DATA_POOL_WARN,
              CHKID_ARG(&chkinfo->id), ret);
        return ret;
}


int pool_proto_renew(pool_proto_t *pool_proto)
{
        int ret;

        ret = lease_set(&pool_proto->lease);
        if (ret)
                GOTO(err_ret, ret);
        
        return 0;
err_ret: 
        return ret;
}

